eb693bdf8ffe69b548efe91cb9e8a027898dfb19,google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/SubscriberImplTest.java,SubscriberImplTest,testStreamAckDeadlineUpdate,#,308

Before Change


        getTestSubscriberBuilder(testReceiver)
            .setAckExpirationPadding(Duration.standardSeconds(1))
            .build();
    subscriber.startAsync().awaitRunning();

    fakeSubscriberServiceImpl.waitForStreamAckDeadline(10);

After Change



  @Test
  public void testStreamAckDeadlineUpdate() throws Exception {
    if (!isStreamingTest) {
      // This test is not applicable to polling.
      return;
    }

    Subscriber subscriber =
        startSubscriber(
            getTestSubscriberBuilder(testReceiver)
                .setAckExpirationPadding(Duration.standardSeconds(1)));

    fakeSubscriberServiceImpl.waitForStreamAckDeadline(10);

    // Send messages to be acked
    testReceiver.setExplicitAck(true);
    sendMessages(ImmutableList.of("A"));

    // Make the ack latency of the receiver equals 20 seconds
    fakeExecutor.advanceTime(Duration.standardSeconds(20));
    testReceiver.replyNextOutstandingMessage();

    // Wait for an ack deadline update
    fakeExecutor.advanceTime(Duration.standardSeconds(60));

    fakeSubscriberServiceImpl.waitForStreamAckDeadline(20);

    // Send more messages to be acked
    testReceiver.setExplicitAck(true);
    for (int i = 0; i < 999; i++) {
      sendMessages(ImmutableList.of(Integer.toString(i)));
    }

    // Reduce the 99th% ack latency of the receiver to 10 seconds
    fakeExecutor.advanceTime(Duration.standardSeconds(10));
    for (int i = 0; i < 999; i++) {
      testReceiver.replyNextOutstandingMessage();
    }

    // Wait for an ack deadline update
    fakeExecutor.advanceTime(Duration.standardSeconds(60));

    fakeSubscriberServiceImpl.waitForStreamAckDeadline(10);

    subscriber.stopAsync().awaitTerminated();
  }

  @Test